[AWS SDK for JavaScript v3] Amazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレート
こんにちは、CX事業本部 Delivery部の若槻です。
Amazon Athenaは、非常に大きなデータセットに対して実行されるクエリをサポートするため、結果セットが非常に大きい場合は、一度にすべてのデータを取得することは望ましくありません。その場合、ページネーションを使用することで、各ページで必要なデータのみを取得し、余分なデータを取得しなくて済むため、クエリの実行時間を短縮することができます。
今回は、AWS SDK for JavaScript v3でAmazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートを作ってみました。
やってみた
クエリ対象のテーブル作成
クエリ対象の適当なデータとして、Amazon CloudTrailのログデータを使ってみます。
Amazon CloudTrailコンソールで[Create Athena table]をクリック。
CloudTrailログが格納されているS3 Bucketを選択し、クリックしてテーブル作成。
クエリ対象のテーブルが作成できました。
テンプレート
必要な依存関係をインスントールします。
npm install @aws-sdk/client-athena
下記はAmazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートです。
import { AthenaClient, GetQueryExecutionCommand, GetQueryResultsCommand, StartQueryExecutionCommand, QueryExecutionState, } from '@aws-sdk/client-athena'; const REGION = 'ap-northeast-1'; const DATABASE = 'default'; const MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND = Number(process.env.MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND) || 10; const QUERY_STATEMENT = process.argv[2]; const athena = new AthenaClient({ region: REGION }); const executeQuery = async (query: string): Promise<string> => { const { QueryExecutionId } = await athena.send( new StartQueryExecutionCommand({ QueryString: query, QueryExecutionContext: { Database: DATABASE }, }) ); if (QueryExecutionId === undefined) throw new Error('QueryExecutionId is undefined.'); return QueryExecutionId; }; const getPage = async ( queryExecutionId: string, pageToken?: string ): Promise<any> { // クエリ実行結果として、結果セットとネクストトークンを取得 const { ResultSet, NextToken } = await athena.send( new GetQueryResultsCommand({ QueryExecutionId: queryExecutionId, MaxResults: MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND, NextToken: pageToken, }) ); if (ResultSet === undefined || ResultSet.Rows === undefined) throw new Error('ResultSet is undefined.'); console.log(ResultSet.Rows.length); // ネクストトークンが含まれていた場合、次のページからクエリ実行結果を取得(再帰実行) if (NextToken) { const nextPage = await getPage(queryExecutionId, NextToken); if (Array.isArray(nextPage)) { return [...ResultSet.Rows, ...nextPage]; } else { return ResultSet.Rows; } } return ResultSet; } const getResults = async (query: string): Promise<any> => { const queryExecutionId = await executeQuery(query); // クエリ実行完了待機 let queryExecutionState = QueryExecutionState.RUNNING; while (queryExecutionState === QueryExecutionState.RUNNING) { await new Promise((resolve) => setTimeout(resolve, 1000)); const { QueryExecution } = await athena.send( new GetQueryExecutionCommand({ QueryExecutionId: queryExecutionId, }) ); if ( QueryExecution == undefined || QueryExecution.Status == undefined || QueryExecution.Status.State == undefined ) throw new Error('QueryExecution is undefined.'); queryExecutionState = QueryExecution.Status.State as QueryExecutionState; } if (queryExecutionState !== QueryExecutionState.SUCCEEDED) { throw new Error( `Query ${queryExecutionId} failed with status ${queryExecutionState}` ); } return getPage(queryExecutionId); }; getResults(QUERY_STATEMENT);
実行してみると、ページネーション毎にデータが取得できていることが確認できました。
$ npx ts-node script.ts "SELECT * FROM cloudtrail_logs_cm_members_XXXXXXXXXXXX LIMIT 100" 10 10 10 10 10 10 10 10 10 10 1
GetQueryResultsCommandのMaxResultsで10
を指定し、クエリステートメントでLIMIT 100
を指定しているため、11回に分けて100件のデータが取得できています。(最初の結果セットにはヘッダー行が含まれるため、100件目のデータは11回目の試行で取得されています。)
Athena-Queryという選択肢もある
ここまでAWS SDK for JavaScript v3のネイティブなメソッドのみを使ってAthenaクエリ実行の結果取得を実装してみましたが、実は上記の実行完了待機およびページネーション処理をラップしたAthena-Queryというライブラリがあります。
Athena-Queryは弊社クラスメソッド公式のリポジトリで管理されており、以前私も下記の使ってみたブログを投稿しました。選択肢としてご検討ください。
参考
以上